package com.atguigu.realtime.apps

import java.util
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.atguigu.common.constants.TopicConstant
import com.atguigu.common.utils.KafkaClientUtil
import com.atguigu.realtime.utils.DStreamUtil
import com.google.gson.Gson
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object LogDiversionApp extends BaseApp {
  override var appName: String = "LogDiversionApp"
  override var groupId: String = "realtime220212"
  override var topic: String = TopicConstant.ORIGINAL_LOG
  override var batchDuration: Int = 10

  /*
      logType: 必须传入displays 或actions
   */
  def parseJsonArray(map: JSONObject,logType: String, result: util.Map[String, AnyRef], topic: String,gson:Gson):Unit = {

    //获取actions或displays部分
    val arrayStr: String = map.getString(logType)

    val jSONArray: JSONArray = JSON.parseArray(arrayStr)

    //声明要保存action的ts
    var ts: AnyRef = null

    for (i <- 0 until jSONArray.size()){

      //取出每一个jSONArray内部的对象
      val eleMap: util.Map[String, AnyRef] = JSON.parseObject(jSONArray.getString(i)).getInnerMap

      //如果是actions类型，就先保留ts
      if (logType.equals("actions")){

        ts = eleMap.get("ts")

      }

      eleMap.putAll(result)

      //保证最后使用的是action中的ts
      if (logType.equals("actions")){

        eleMap.put("ts",ts)

      }

      KafkaClientUtil.sendDataToKafka(topic,gson.toJson(eleMap))

    }



  }

  def parseLog(rdd: RDD[ConsumerRecord[String, String]]):Unit = {

    val rdd1: RDD[String] = rdd.map(record => record.value())

    rdd1.foreachPartition(partition => {

          //每个分区创建一个Gson对象
          val gson = new Gson()

      partition.foreach(

      logStr => {

        val map: JSONObject = JSON.parseObject(logStr)

        val commonStr: String = map.getString("common")

        // 只保留了common部分
        val resultMap: JSONObject = JSON.parseObject(commonStr)

        resultMap.put("ts",map.getLong("ts"))

        val result: util.Map[String, AnyRef] = resultMap.getInnerMap

        /*
              以上是五种类型通用的封装，接下来判断当前日志的类型，再继续封装对应的字段

              startlog:  start
              error:   判断是启动时报错，还是非启动报错，封装对应的字段
              page:
                    page
                    dispaly
                    actions
         */
        if (map.containsKey("start") && !map.containsKey("err")){

          //封装start部分
          val startMap: util.Map[String, AnyRef] = JSON.parseObject(map.getString("start")).getInnerMap

          //将startMap的K-V合并到resultMap中
          result.putAll(startMap)

          //生产数据到kafka
          KafkaClientUtil.sendDataToKafka(TopicConstant.STARTUP_LOG,gson.toJson(result))


        }else if (map.containsKey("err")){
          //封装错误日志
          val errMap: util.Map[String, AnyRef] = JSON.parseObject(map.getString("err")).getInnerMap
          result.putAll(errMap)

            if (map.containsKey("start")){

              val startMap: util.Map[String, AnyRef] = JSON.parseObject(map.getString("start")).getInnerMap

              result.putAll(startMap)

            }else{

              val pageMap: util.Map[String, AnyRef] = JSON.parseObject(map.getString("page")).getInnerMap

              result.putAll(pageMap)

              if (map.containsKey("actions")){

                result.put("actions",map.getString("actions"))

              }

              if (map.containsKey("displays")){

                result.put("displays",map.getString("displays"))

              }

            }

          //将最终的error写入kafka
          KafkaClientUtil.sendDataToKafka(TopicConstant.ERROR_LOG,gson.toJson(result))

        }else{
          //page,actions,displays
          val pageMap: util.Map[String, AnyRef] = JSON.parseObject(map.getString("page")).getInnerMap

          result.putAll(pageMap)

          KafkaClientUtil.sendDataToKafka(TopicConstant.PAGE_LOG,gson.toJson(result))

          if (map.containsKey("actions")){

            //展平actions 炸裂
            parseJsonArray(map,"actions",result,TopicConstant.ACTION_LOG,gson)

          }

          if (map.containsKey("displays")){

            parseJsonArray(map,"displays",result,TopicConstant.DISPLAY_LOG,gson)

          }
        }
      })
    })

    //整个分区写完后刷写缓冲区
    KafkaClientUtil.flush()
  }

  def main(args: Array[String]): Unit = {

    context = new StreamingContext("local[*]",appName,Seconds(batchDuration))

    runApp{

      //②获取流
      val ds: InputDStream[ConsumerRecord[String, String]] = DStreamUtil.createDS(context, groupId, topic)
      
      // 对消费到的每一条日志的类型进行判断，写出到kafka
      ds.foreachRDD(rdd => {
        
        if (!rdd.isEmpty()){

          //获取偏移量
          val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

          //写出日志
          parseLog(rdd)

          //提交偏移量
          ds.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
          
          
        }
        
        
      })


    }

  }
}
